-
Notifications
You must be signed in to change notification settings - Fork 114
fix(tunnel): fix ups race condition #2858
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(tunnel): fix ups race condition #2858
Conversation
The latest updates on your projects. Learn more about Vercel for GitHub.
1 Skipped Deployment
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
How to use the Graphite Merge QueueAdd the label merge-queue to this PR to add it to the merge queue. You must have a Graphite account in order to use the merge queue. Sign up using this link. An organization admin has enabled the Graphite Merge Queue in this repository. Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue. This stack of pull requests is managed by Graphite. Learn more about stacking. |
commit: |
03adb7d
to
6cf9da6
Compare
Claude encountered an error —— View job I'll analyze this and get back to you. |
6cf9da6
to
03adb7d
Compare
Claude encountered an error —— View job I'll analyze this and get back to you. |
03adb7d
to
6cf9da6
Compare
Claude encountered an error —— View job I'll analyze this and get back to you. |
if let Some(sub) = driver.subscriptions.get(&subject).await { | ||
if sub.tx.receiver_count() == 0 { | ||
driver.subscriptions.invalidate(&subject).await; | ||
|
||
let mut hasher = DefaultHasher::new(); | ||
subject.hash(&mut hasher); | ||
let subject_hash = BASE64.encode(&hasher.finish().to_be_bytes()); | ||
|
||
let sql = format!("UNLISTEN {}", quote_ident(&subject_hash)); | ||
let unlisten_res = driver.client.batch_execute(&sql).await; | ||
|
||
if let std::result::Result::Err(err) = unlisten_res { | ||
tracing::error!(%subject, ?err, "failed to unlisten subject"); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There appears to be a race condition in the subscription cleanup logic. When checking sub.tx.receiver_count() == 0
and then performing invalidation and UNLISTEN operations, new receivers could be added to the subscription between these steps. This creates a Time-of-Check-Time-of-Use vulnerability where active subscriptions might be incorrectly terminated.
Consider using a more atomic approach for this cleanup operation, such as:
- Acquiring a lock before checking the receiver count
- Using a compare-and-swap pattern if available
- Moving this logic into a synchronized context where subscription additions are also managed
This would prevent the scenario where a subscription is invalidated while new clients are attempting to use it.
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
let (client, mut conn) = tokio_postgres::connect(&conn_str, tokio_postgres::NoTls).await?; | ||
tokio::spawn(async move { | ||
// NOTE: This loop will stop automatically when client is dropped | ||
loop { | ||
match poll_fn(|cx| conn.poll_message(cx)).await { | ||
Some(std::result::Result::Ok(AsyncMessage::Notification(note))) => { | ||
if let Some(sub) = subscriptions2.get(note.channel()).await { | ||
let env = match serde_json::from_str::<Envelope>(¬e.payload()) { | ||
std::result::Result::Ok(env) => env, | ||
std::result::Result::Err(err) => { | ||
tracing::error!(?err, "failed deserializing envelope"); | ||
break; | ||
} | ||
}; | ||
let payload = match BASE64 | ||
.decode(env.payload) | ||
.context("invalid base64 payload") | ||
{ | ||
std::result::Result::Ok(p) => p, | ||
std::result::Result::Err(err) => { | ||
tracing::error!(?err, "failed deserializing envelope"); | ||
break; | ||
} | ||
}; | ||
|
||
let _ = sub.tx.send((payload, env.reply_subject)); | ||
} | ||
} | ||
Some(std::result::Result::Ok(_)) => continue, | ||
Some(std::result::Result::Err(err)) => { | ||
tracing::error!(?err, "ups poll loop failed"); | ||
break; | ||
} | ||
None => break, | ||
} | ||
} | ||
|
||
tracing::info!("ups poll loop stopped"); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling in this connection polling task needs improvement to prevent resource leaks and silent failures. Currently, when deserialization errors occur (lines 96-101 and 108-111), the task breaks out of the loop and terminates without proper cleanup or recovery. This creates several issues:
- The connection remains in an undefined state when the task terminates early
- Message processing stops permanently with no notification to subscribers
- The
client
connection resource may leak if the task exits unexpectedly
Consider implementing:
- Robust error handling that logs but continues processing other messages
- A reconnection mechanism if the connection fails
- A way to notify the system when message delivery is compromised
- Proper resource cleanup in all termination paths
This would make the system more resilient to transient errors and prevent silent degradation of the pub/sub functionality.
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
eb42b41
to
4b91e7b
Compare
6cf9da6
to
624b560
Compare
Claude encountered an error —— View job I'll analyze this and get back to you. |
Claude encountered an error —— View job I'll analyze this and get back to you. |
4b91e7b
to
eb42b41
Compare
Merge activity
|
No description provided.